分布式任务调度系统分发及负载均衡实现方案
应用场景
设计实现一个分布式任务调度系统,要考虑实现定时任务、延迟任务、任务状态维护、任务调度分发等方面,而调度分发最常用的策略就是负载均衡。
负载均衡算是分布式应用中最基础手段,在DNS接入层,网关层如硬件F5,软件LVS,nginx等均有实现。而任务调度分发、消息调度分发,rpc调用选择等设计中也常用负载均衡。
实现负载均衡方案
常用的负载均衡算法主要有:随机算法,轮询算法,加权轮询算法、hash算法、一致性hash算法,其他还有最短响应时间,最少连接数等。
1.随机算法
1.1 实现原理
从0-N个获取一个随机数,取出随机数对应的节点。
1.2 代码实现
//list store all the node
type RandomBalance struct {
curIdx int
allNodes []string
}
//add node
func (r *RandomBalance) Add(params ...string) error {
if len(params) == 0 {
return errors.New("param len 1 at least")
}
addr := params[0]
r.allNodes = append(r.allNodes, addr)
return nil
}
//get node
func (r *RandomBalance) Get() (string, error) {
if len(r.allNodes) == 0 {
return "", errors.New("allNodes is empty")
}
r.curIdx = rand.Intn(len(r.allNodes)) //use rand to generate random
return r.allNodes[r.curIdx], nil
}
2.轮询算法
2.1 实现原理
依次从0,1,2...N获取服务,机会均等的轮流请求/分发任务。当指针到达末尾N时,重新回到第1个。
2.2 代码实现
//list store all the node
type RoundRobinBalance struct {
curIndex int
allNodes []string
}
//add node
func (r *RoundRobinBalance) Add(params ...string) error {
r.allNodes = append(r.allNodes, params[0])
return nil
}
//get node
func (r *RoundRobinBalance) Get() (string, error) {
if len(r.allNodes) == 0 {
return "", errors.New("list is empty")
}
lens := len(r.allNodes)
//move to beginning node
if r.curIndex >= lens {
r.curIndex = 0
}
curNode := r.allNodes[r.curIndex]
//move to next node
r.curIndex = (r.curIndex + 1) % lens
return curNode, nil
}
3.加权轮询算法
3.1 实现原理
每个节点设置对应的权重,权重越大可能被选中的次数越高,某节点被选中的次数≈(本节点权重/全部权重) * 总分配次数。
举例说明:
3个节点对应权重为 node:weight [a=1,b=2,c=5]
currentWeight 代表每次请求节点的当前权重,为currentWeight+weight
totalWeight 代表所有节点初始权重之和 1+2+5=8
第一次请求:
currentWeight 为 [a=0+1,b=0+2,c=0+5] ,选中最大的c做为本次输出,之后c节点的权重需要减去totalWeightcurrent,调整后 [a=1,b=2,c=5-8] 也就是 [a=1,b=2,c=-3]
第二次请求:
currentWeight 为 [a=1+1,b=2+2,c=-3+5] 结果为 [a=2,b=4,c=2],选中最大的b作为本次输出,之后节点权重变更为 [a=2,b=-4,c=2]
第三次请求:
currentWeight 为 [a=2+1,b=-4+2,c=2+5] 结果为 [a=3,b=-2,c=7],又轮到c(权重大的好处体现出来了),之后节点权重变更为 [a=3,b=-2,c=-1]
第四次请求:
[a=3,b=-2,c=-1] 加权后[a=4,b=0,c=4],a与c相等,优先选前者输出a
3.2 代码实现
//node list
type WeightRoundRobinBalance struct {
curIdx int
allNodes []*WeightNode
}
//weight node
type WeightNode struct {
node string
weight int //init weight
currentWeight int //every round weight
}
//add node
func (r *WeightRoundRobinBalance) Add(params ...string) error {
if len(params) != 2 {
return errors.New("param len need 2")
}
parInt, err := strconv.ParseInt(params[1], 10, 64)
if err != nil {
return err
}
node := &WeightNode{node: params[0], weight: int(parInt)}
r.allNodes = append(r.allNodes, node)
return nil
}
//get node
func (r *WeightRoundRobinBalance) Get() (string, error) {
totalWeight := 0
var bestNode *WeightNode
for i := 0; i < len(r.allNodes); i++ {
curNode := r.allNodes[i]
totalWeight += curNode.weight
curNode.currentWeight += curNode.weight
//choose the largest weight
if bestNode == nil || curNode.currentWeight > bestNode.currentWeight {
bestNode = curNode
}
}
if bestNode == nil {
return "", errors.New("get error")
}
bestNode.currentWeight -= totalWeight
return bestNode.node, nil
}
//test
func main() {
weightLb := loadbalance.LoadBalanceFactory(loadbalance.Weight)
weightLb.Add("a", "1")
weightLb.Add("b", "2")
weightLb.Add("c", "5")
for i := 0; i < 20; i++ {
weightRs, _ := weightLb.Get()
fmt.Println(weightRs)
}
}
测试输出
func main() {
weightLb := loadbalance.LoadBalanceFactory(loadbalance.Weight)
weightLb.Add("a", "1")
weightLb.Add("b", "2")
weightLb.Add("c", "5")
var count = make(map[string]int)
for i := 0; i < 200000; i++ {
weightRs, _ := weightLb.Get()
count[weightRs]++
}
fmt.Println(count)
}
测试输出
比例为1:2:5。
4.工厂模式封装
定义接口:
type LoadBalance interface {
Add(...string) error
Get() (string, error)
}
定义工厂生成器:
const (
Random = iota
RoundRobin
Weight
)
func LoadBalanceFactory(lbType int) LoadBalance {
switch lbType {
case Random:
return new(RandomBalance)
case RoundRobin:
return new(RoundRobinBalance)
case Weight:
return new(WeightRoundRobinBalance)
default:
return new(RoundRobinBalance)
}
}
分布式服务列表维护
分布式调度分发关键是要维护一个有效服务(worker)列表,然后按负载均衡策略对服务列表进行任务分发。
1. 反向代理
使用反向代理如nginx作为对worker服务进行负载均衡代理,同时也对请求做路由转发,这种方式会多一跳。
2. 调度分发协调者
从任务调度分发器中选举一个协调者,选举过程可用consul或zookeeper管理。协调者负责维护有效服务列表,分发路由策略,故障隔离或转移。
任务调度器分发任务前,先从协调者获取服务地址,然后进行任务分发,协调者本身可以作为任务分发者共同参与,也可只负责协调不处理任务,协调者如果发生故障,可由consul重新选举新的协调者。
此架构中,如果worker服务发生故障,需要协调者能及时发现并从服务列表中移除故障节点,进行故障隔离或故障转移。协调者与服务者可以通过心跳保持或服务探针探活方案进行故障检测。
2.1 协调者与服务者心跳保持方案
服务提供方发送心跳包,协调者接收判活,并根据过期时间清理过期服务地址。
服务提供者每秒发送一个心跳包,通过mq(这里选rabbitmq)发送服务心跳。
//send heart beat every second
func StartHeartbeat() {
mq := rabbimt.NewMQ()
defer mq.Close()
mq.BindExchange("heart-beat-exchange")
for {
mq.Send("heart-beat-queue", addr)
time.Sleep(1 * time.Second) //1 can be config
}
}
协调者开启监听接收心跳包更新服务时间,清理过期服务,定义10s未更新时间就过期。
//global variable
var addrList = make(map[string]time.Time)
var mutex sync.Mutex
//listen and revice heart beat
func ListenHeartbeat() {
mq := rabbimt.NewMQ()
defer mq.Close()
mq.BindExchange("heart-beat-exchange")
go clearExpiredAddr() //clear
msgList := mq.ReceiveMsg()
for msg := range msgList {
addr, err := strconv.Unquote(string(msg.Body))
if err != nil {
panic(err)
}
mutex.Lock()
addrList[addr] = time.Now()
mutex.Unlock()
}
}
//clear expired addr,10 seconds no reply
func clearExpiredAddr() {
for {
time.Sleep(1 * time.Second)
mutex.Lock()
for addr, lastTime := range addrList {
if lastTime.Add(10 * time.Second).Before(time.Now()) { //10 shoud be config
delete(addrList, addr)
log.Println(addr + " expire removed!")
}
}
mutex.Unlock()
}
}
2.2 协调者服务探活方案设计
协调者发送请求到服务提供者探针接口,如正常返回放入服务列表中,当请求失败达到阈值从列表中移除,协调者起到服务健康检查的效果。
使用ticker创建定时检查任务:
//health check
func HealthCheck() {
ticker := time.NewTicker(time.Second * 5) //every 5 second send a ping request
for {
select {
case <-ticker.C:
log.Println("start health check...")
loopCheck()
log.Println("health check end")
}
}
}
维护全局addrList,遍历检查addr,通过get请求确认是否正常(可以判断返回内容是否正常,这里只判断http状态是否为200),超过最大失败次数则从addrList中删除。
var addrList = make([]string, 0)
var failCount = make(map[string]int)
var mutex sync.Mutex
var maxRetrytimes int = 3
//get request keep alive
func pingCheck(addr string) bool {
resp, err := http.Get(addr)
if err != nil {
return false
}
defer resp.Body.Close()
fmt.Println(resp.StatusCode)
if resp.StatusCode != 200 {
return false
}
return true
}
//loop addrlist and check alive
func loopCheck() {
for idx, addr := range addrList {
if pingCheck(addr) {
fmt.Println("ok", addr)
continue
}
mutex.Lock()
failCount[addr]++
if failCount[addr] > maxRetrytimes {
addrList = append(addrList[:idx], addrList[idx+1:]...)
log.Println("remove", addr)
}
mutex.Unlock()
}
}
请求失败超阈值,也可以置为失效并不移除,并间隔一段时间进行重试,如果恢复则重新置为可用。最后获取有效的addrList,并根据负载均衡策略进行选择。
//add/init addrlist
func AddAddr(addr ...string) {
mutex.Lock()
defer mutex.Unlock()
addrList = append(addrList, addr...)
}
//get alive addrlist
func GetAliveAddrList() []string {
mutex.Lock()
defer mutex.Unlock()
return addrList
}
测试服务健康检查执行结果如下:
实现一个分布式调度系统,涉及定时、延迟任务等更多技术细节会在后续文章中依次发布,欢迎订阅我的公众号:技术岁月。
文章相关代码实现:
https://github.com/skyhackvip/dispatcher